Source code for hysop.fields.ghost_exchangers

# Copyright (c) HySoP 2011-2024
# This file is part of HySoP software.
# See ""
# for further info.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import ABCMeta, abstractmethod
import hashlib
import numpy as np

from import check_instance, to_tuple, first_not_None
from import default_invalid_value
from import iter_mpi_requests, dtype_to_mpi_type
from hysop.constants import (
from hysop.topology.topology import Topology, TopologyView
from hysop.topology.cartesian_topology import CartesianTopologyView
from hysop.core.arrays.all import HostArray
from hysop.core.mpi import MPI
from hysop.core.mpi.topo_tools import TopoTools
from hysop.testsenv import __HAS_OPENCL_BACKEND__

    from hysop.core.arrays.all import OpenClArray
    from hysop.backend.device.opencl import cl, clArray
    from hysop.backend.device.opencl.opencl_kernel_launcher import HostLauncherI
from hysop.constants import BoundaryCondition, BoundaryConditionConfig


[docs] def gprint(*args, **kwds): level = kwds.pop("level", 2) if GHOST_EXCHANGE_DEBUG_LEVEL >= level: print(*args, **kwds)
[docs] def gprint_buffer(msg, buf, *args, **kwds): no_data = kwds.pop("no_data", False) if isinstance(buf, list): mpi_type = buf[1] buf = buf[0] else: mpi_type = None gprint( "{}: mpi_type={}, shape={}, dtype={}, c_contiguous={}, f_contiguous={}".format( msg, mpi_type, buf.shape, buf.dtype, buf.flags.c_contiguous, buf.flags.f_contiguous, ) ) if no_data: return gprint(" " + "\n ".join(str(buf).split("\n")))
[docs] class LocalBoundaryExchanger: """ Helper class to generate symmetric and antisymmetric local ghost exchangers. This is used for non-periodic boundary conditions: HOMOGENEOUS_DIRICHLET: antisymmetric ghost exchange HOMOGENEOUS_NEUMANN: symmetric ghost exchange """
[docs] @classmethod def build_exchanger(cls, shape, direction, H, to_left): shape = to_tuple(shape) assert isinstance(direction, int) assert isinstance(H, tuple) ndim = len(shape) S = shape[direction] G = (S - 1) // 2 H = np.asarray(H).copy() assert direction < ndim assert S % 2 == 1 assert G > 0 assert H.size == G + 1 def mk_slc(*args, **kwds): if "extend" in kwds: slices = [np.newaxis] * ndim else: slices = [slice(None, None)] * ndim slices[direction] = slice(*args) return tuple(slices) if to_left: src_slc = mk_slc(G, S, +1) dst_slc = mk_slc(0, G + 1, +1) islc = mk_slc(None, None, None) oslc = mk_slc(None, None, -1) else: src_slc = mk_slc(0, G + 1, +1) dst_slc = mk_slc(G, S, +1) islc = mk_slc(None, None, -1) oslc = mk_slc(None, None, None) hslc = mk_slc(0, H.size, 1, extend=True) H = H[hslc] def exchange_ghosts(X): assert X is not None assert X.shape == shape X[dst_slc][oslc] = H * X[src_slc][islc] return exchange_ghosts
[docs] @classmethod def build_symmetric_exchanger(cls, shape, direction, to_left): S = (shape[direction] + 1) // 2 H = (1,) * S return cls.build_exchanger( shape=shape, direction=direction, H=H, to_left=to_left )
[docs] @classmethod def build_antisymmetric_exchanger(cls, shape, direction, to_left): S = (shape[direction] + 1) // 2 H = (0,) + (-1,) * (S - 1) return cls.build_exchanger( shape=shape, direction=direction, H=H, to_left=to_left )
[docs] @classmethod def build_scalar_exchanger(cls, value, shape, direction, to_left=None): if to_left is None: def exchange_ghosts(X, value=value): assert X is not None X[...] = value else: ndim = len(shape) def mk_slc(*args, **kwds): if "extend" in kwds: slices = [np.newaxis] * ndim else: slices = [slice(None, None)] * ndim slices[direction] = slice(*args) return tuple(slices) S = shape[direction] G = (S - 1) // 2 if to_left: dst_slc = mk_slc(0, G + 1, +1) else: dst_slc = mk_slc(G, S, +1) def exchange_ghosts(X, value=value): assert X is not None X[dst_slc] = value return exchange_ghosts
[docs] class GhostExchangerI(metaclass=ABCMeta): """Abstract interface for a ghost exchanger."""
[docs] @abstractmethod def exchange_ghosts(self, **kwds): pass
def __call__(self, **kwds): return self.exchange_ghosts(**kwds)
[docs] class MultiGhostExchanger(GhostExchangerI): """Handle multiple ghost exchangers.""" def __init__(self, name): = name self.kind = None self._exchangers = () self._launcher = None def __iadd__(self, other): assert self._launcher is None, "launcher already built." if other is None: return self if isinstance(other, GhostExchanger): if other.kind is None: return if self.kind is None: self.kind = other.kind assert self.kind == other.kind self._exchangers += (other,) elif isinstance(other, MultiGhostExchanger): if other.kind is None: return assert self.kind == other.kind self._exchangers += other._exchangers else: msg = f"Uknown type {type(other)}." raise TypeError(msg) return self
[docs] def exchange_ghosts(self, **kwds): """Exchange ghosts on specified data.""" assert self._exchangers if self._launcher is None: self._build_launcher() assert self._launcher is not None if self.kind is Backend.OPENCL: kwds.setdefault("queue", self._exchangers[0].cl_env.default_queue) return self._launcher(**kwds)
def _build_launcher(self): if self._launcher is not None: return self._launcher elif not self._exchangers: return None for g in self._exchangers: g._build_launcher() if self.kind is Backend.HOST: def _launcher(**kwds): for g in self._exchangers: g._launcher(**kwds) self._launcher = _launcher elif self.kind is Backend.OPENCL: from hysop.backend.device.opencl.opencl_kernel_launcher import ( OpenClKernelListLauncher, ) name = f"multighost_exchanger_{}" kl = OpenClKernelListLauncher(name=name) for g in self._exchangers: kl += g._launcher self._launcher = kl else: msg = f"Unknown array backend kind {self.kind}." raise NotImplementedError(msg) assert self._launcher is not None return self._launcher
[docs] class GhostExchanger(GhostExchangerI): """Prepare a backend specific ghost exchange, possibly on multiple data.""" def __init__(self, name, topology, data, exchange_method, ghost_op, ghost_mask): check_instance(name, str) check_instance(topology, TopologyView) check_instance(data, tuple, minsize=1) check_instance(exchange_method, ExchangeMethod) check_instance(ghost_op, GhostOperation) check_instance(ghost_mask, GhostMask) dtype = data[0].dtype for d in data[1:]: if d.dtype != dtype: msg = "All data should have the same dtype, got {} and {}." msg = msg.format(dtype, d.dtype) raise RuntimeError(msg) base_dtype = dtype base_mpi_type = dtype_to_mpi_type(dtype) self.topology = topology = data self.mesh = topology.mesh self.backend = ( data[0].backend if hasattr(data[0], "backend") else topology.backend ) self.host_backend = self.backend.host_array_backend self.base_tag = int(hashlib.sha1(name.encode("utf-8")).hexdigest(), 16) % ( 104729 ) self.exchange_method = exchange_method self.ghost_op = ghost_op self.ghost_mask = ghost_mask self.base_dtype = base_dtype self.base_mpi_type = base_mpi_type = name
[docs] class CartesianDiscreteFieldGhostExchanger(GhostExchanger): def __init__( self, name, topology, data, kind=None, directions=None, ghosts=None, ghost_op=None, ghost_mask=None, exchange_method=None, global_lboundaries_config=None, global_rboundaries_config=None, ): """ By default, we exchange all ghosts, including diagonals. This is done by setting ghost_mask to GhostMask.FULL. Just Cartesian Communicator neighbours are considered here (there is no direct communication between diagonal processes). X P X ^ P < P > P v X P X Diagonal ghosts are exchanged by chaining exchanges on two or more axes. If ghost_mask is set to GhostMask.CROSS, diagonal ghosts are set to NAN to ensure they are not used. Boundary conditions are hidden in the topology parameter: (PERIODIC/PERIODIC) => standard periodic ghost exchange on the domain boundary standard periodic ghost accumulation (XXX/YYY) => symmetric or antisymmetric ghost exchange ghost accumulation is a noop Here XXX and YYY are either HOMOGENEOUS_DIRICHLET or HOMOGENEOUS_NEUMANN. """ gprint( "CartesianGhostExchanger.__init__(name={}, \n topology={}, kind={}, directions={}, ghosts={}, exchange_method={}, ghost_op={}, ghost_mask={}]".format( name, topology.full_tag, kind, directions, ghosts, exchange_method, ghost_op, ghost_mask, ) ) ghost_op = first_not_None(ghost_op, GhostOperation.EXCHANGE) check_instance(ghost_op, GhostOperation) ghost_mask = first_not_None(ghost_mask, GhostMask.FULL) check_instance(ghost_mask, GhostMask) exchange_method = first_not_None( exchange_method, ExchangeMethod.NEIGHBOR_ALL_TO_ALL_V ) check_instance(exchange_method, ExchangeMethod) super().__init__( topology=topology, data=data, name=name, exchange_method=exchange_method, ghost_op=ghost_op, ghost_mask=ghost_mask, ) mesh = self.mesh dim = mesh.dim directions = to_tuple(first_not_None(directions, range(dim))) ghosts = to_tuple(first_not_None(ghosts, mesh.ghosts)) check_instance(topology, CartesianTopologyView) check_instance(directions, tuple, values=int) check_instance(ghosts, tuple, values=int, minval=0) assert len(directions) > 0 assert len(set(directions)) == len(directions) assert len({id(d) for d in data}) == len(data) assert all(0 <= d < dim for d in directions) assert len(ghosts) == dim or len(ghosts) == 1 if len(ghosts) == 1: ghosts = tuple(ghosts[0] if (i in directions) else 0 for i in range(dim)) self.directions = directions self.outer_ghosts = mesh.get_local_outer_ghost_slices( ghosts=ghosts, ghost_mask=ghost_mask ) self.inner_ghosts = mesh.get_local_inner_ghost_slices( ghosts=ghosts, ghost_mask=ghost_mask ) self.boundary_layers = mesh.get_boundary_layer_slices( ghosts=ghosts, ghost_mask=ghost_mask ) self.all_inner_ghost_slices = mesh.get_all_local_inner_ghost_slices( ghosts=ghosts ) self.all_outer_ghost_slices = mesh.get_all_local_outer_ghost_slices( ghosts=ghosts ) self.dim = dim self.ghosts = ghosts # check that enforced boundaries kind matches topology boundaries if global_lboundaries_config is not None: global_lboundaries = np.asarray( tuple(map(lambda x: x.bc, global_lboundaries_config)) ) assert (global_lboundaries == mesh.global_lboundaries).all(), ( global_lboundaries, mesh.global_lboundaries, ) if global_rboundaries_config is not None: global_rboundaries = np.asarray( tuple(map(lambda x: x.bc, global_rboundaries_config)) ) assert (global_rboundaries == mesh.global_rboundaries).all(), ( global_rboundaries, mesh.global_rboundaries, ) self.global_lboundaries_config = global_lboundaries_config self.global_rboundaries_config = global_rboundaries_config kind = first_not_None(kind, topology.backend.kind) if kind == Backend.HOST: for d in data: assert isinstance(d, (np.ndarray, HostArray)), type(d) elif __HAS_OPENCL_BACKEND__ and (kind == Backend.OPENCL): for d in data: assert isinstance(d, (clArray.Array, OpenClArray)), type(d) else: msg = f"Unknown topology array backend kind {kind}." raise ValueError(msg) self.kind = kind self._launcher = None def fmt_slices(slices, prefix="\n "): s = "" for i, (lslcs, rslcs, shape) in enumerate(slices): s += f"{prefix}direction {i} || LEFT: {lslcs} || RIGHT: {rslcs} || SHAPE: {shape}" return s msg = """ TOPOLOGY INFO: dim: {} topology.shape: {} topology.coords: {} topology.lboundary: {} topology.rboundary: {} EXCHANGE INFO kind: {} directions: {} ghosts: {} outer_ghosts: {} inner_ghosts: {} boundary_layers: {} global lboundaries: {} global rboundaries: {} """.format( self.dim, topology.proc_shape, topology.proc_coords, mesh.is_at_left_boundary, mesh.is_at_right_boundary, self.kind, self.directions, self.ghosts, fmt_slices(self.outer_ghosts), fmt_slices(self.inner_ghosts), self.boundary_layers, global_lboundaries_config, global_rboundaries_config, ) gprint(msg)
[docs] def exchange_ghosts(self, **kwds): """Exchange ghosts on specified data.""" if self._launcher is None: self._build_launcher() assert self._launcher is not None if self.kind is Backend.OPENCL: kwds.setdefault("queue", self.backend.cl_env.default_queue) return self._launcher(**kwds)
def _build_launcher(self): if self._launcher is not None: return self._launcher if self.kind is Backend.HOST: self._build_python_launcher() elif self.kind is Backend.OPENCL: self._build_opencl_launcher() else: msg = f"Unknown array backend kind {self.kind}." raise NotImplementedError(msg) assert self._launcher is not None return self._launcher def _prepare_launcher(self): class _LauncherParameters: def __init__(self): # Call kwds depends on exchange method used (Send-Recv, Neighbor_AlltoAll, ...) # SEND-RECV # > one call per direction, per data buffer + (left + right) self.isend_kwds = [] self.irecv_kwds = [] self.i_dst_buffers = [] self.i_src_buffers = [] # NEIGHBOR_ALL_TO_ALL_V # > only one call for all buffers self.v_kwds = None self.v_send_requests = {} self.v_recv_requests = {} self.v_src_buffers = () self.v_dst_buffers = () self.v_send_buffer = None self.v_recv_buffer = None # NEIGHBOR_ALL_TO_ALL_W # > one call per data buffer self.w_kwds = [] self.w_src_buffers = [] self.w_dst_buffers = [] self.w_send_buffer = [] self.w_recv_buffer = [] # COMMON PARAMETERS self.local_exchanges = [] self.local_symmetries = [] self.diagonal_ghosts = [] self.has_mpi_exchanges = False self.from_buffer = None # should source data be bufferized ? self.to_buffer = None # should target data be bufferized ? def setup(self): local_symmetries = [] for ls in self.local_symmetries: (buf, slices, shape, d, to_left, boundary) = ls bc = boundary.bc if bc is BoundaryCondition.HOMOGENEOUS_DIRICHLET: if isinstance(boundary, BoundaryConditionConfig) and ( is not None ): msg = "Boundary of type HOMOGENEOUS_DIRICHLET does not handle custom boundary data, got {}." raise RuntimeError(msg, type( else: fn = LocalBoundaryExchanger.build_antisymmetric_exchanger( shape=shape, direction=d, to_left=to_left ) elif bc is BoundaryCondition.HOMOGENEOUS_NEUMANN: if isinstance(boundary, BoundaryConditionConfig) and ( is not None ): # allow to force boundary ghosts to a specific scalar value if isinstance(, (int, float, np.number)): fn = LocalBoundaryExchanger.build_scalar_exchanger(, shape=shape, direction=d ) else: msg = "Boundary of type HOMOGENEOUS_NEUMANN only handle custom scalar boundary data, got {}." raise RuntimeError(msg, type( else: fn = LocalBoundaryExchanger.build_symmetric_exchanger( shape=shape, direction=d, to_left=to_left ) elif bc is BoundaryCondition.DIRICHLET: if isinstance(, (int, float, np.number)): fn = LocalBoundaryExchanger.build_scalar_exchanger(, shape=shape, direction=d, to_left=to_left, ) else: msg = "Boundary of type DIRICHLET only handle custom scalar boundary data, got {}." raise RuntimeError(msg, type( else: msg = f"Unknown boundary condition {bc}." raise RuntimeError(msg, type(boundary).__name__) local_symmetries.append((fn, buf, slices, shape, d, to_left, bc)) self.local_symmetries = local_symmetries return self def msg_tag(i, local_rank, target_rank, d, direction): tag = self.base_tag tag += (i + 1) * 7919 tag += (local_rank + 1) * 967 tag += (target_rank + 1) * 103 tag += (d + 1) * 97 tag += (direction + 1) * 17 return tag dim = self.dim ghosts = self.ghosts ghost_op = self.ghost_op comm = self.topology.comm local_rank = self.topology.cart_rank neighbour_ranks = self.topology.proc_neighbour_ranks proc_shape = self.topology.proc_shape base_dtype = self.base_dtype base_mpi_type = self.base_mpi_type exchange_method = self.exchange_method global_lboundaries_config = self.global_lboundaries_config global_rboundaries_config = self.global_rboundaries_config mesh = self.topology.mesh is_at_left_boundary = mesh.is_at_left_boundary is_at_right_boundary = mesh.is_at_right_boundary left_boundaries_kind = mesh.local_lboundaries right_boundaries_kind = mesh.local_rboundaries src_data_on_device = self.kind is not Backend.HOST dst_data_on_device = self.kind is not Backend.HOST all_outer_ghost_slices = self.all_outer_ghost_slices lp = _LauncherParameters() assert for i, buf in enumerate( lp.w_send_requests = {} lp.w_recv_requests = {} if ( (ghost_op is GhostOperation.EXCHANGE) and (dim > 1) and (self.ghost_mask is GhostMask.CROSS) ): # set all outer diagonal ghosts to NAN for directions in all_outer_ghost_slices[dim]: if all(ghosts[d] > 0 for d in directions): for displacements in all_outer_ghost_slices[dim][directions]: ndisplacements = sum(d != 0 for d in displacements) if ndisplacements < 2: continue slc, shape = all_outer_ghost_slices[dim][directions][ displacements ] value = default_invalid_value(dtype=base_dtype) lp.diagonal_ghosts.append((buf, slc, shape, value)) assert self.directions assert any(ghosts[d] > 0 for d in self.directions) for d in self.directions: if ghosts[d] == 0: continue lboundary = left_boundaries_kind[d] rboundary = right_boundaries_kind[d] at_left = is_at_left_boundary[d] at_right = is_at_right_boundary[d] lnone = lboundary is BoundaryCondition.NONE rnone = rboundary is BoundaryCondition.NONE lperiodic = lboundary is BoundaryCondition.PERIODIC rperiodic = rboundary is BoundaryCondition.PERIODIC should_exchange_to_left = lnone or lperiodic should_exchange_to_right = rnone or rperiodic assert at_left ^ lnone assert at_right ^ rnone inner_left, inner_right, shape = self.inner_ghosts[d] outer_left, outer_right, shape = self.outer_ghosts[d] left_boundary_layer, right_boundary_layer, bl_shape = ( self.boundary_layers[d] ) assert (left_boundary_layer is None) ^ (at_left and not lperiodic) assert (right_boundary_layer is None) ^ (at_right and not rperiodic) left_rank = neighbour_ranks[0, d] right_rank = neighbour_ranks[1, d] nprocs = proc_shape[d] lp.has_mpi_exchanges |= nprocs > 1 assert ( (nprocs == 1) or should_exchange_to_left or should_exchange_to_right ) msg = """ DATA {} EXCHANGES FOR DIRECTION {}: nprocs (directional procs): {} src_data_on_device: {} dst_data_on_device: {} local_rank: {} left_rank / right_rank: {} / {} at_left / at_right: {} / {} lboundary / rboundary: {} / {} xchg_to_left / xchg_to_right: {} / {} inner_left / inner_right: {} / {} outer_left / outer_right: {} / {} associated_shape: {} left_blayer / right_blayer: {} / {} boundary_layer_shape: {} Send/Receive:""".format( i, d, nprocs, src_data_on_device, dst_data_on_device, local_rank, left_rank, right_rank, at_left, at_right, lboundary, rboundary, should_exchange_to_left, should_exchange_to_right, inner_left, inner_right, outer_left, outer_right, shape, left_boundary_layer, right_boundary_layer, bl_shape, ) gprint(msg) if not should_exchange_to_left: glboundary = ( global_lboundaries_config[d] if (global_lboundaries_config is not None) else lboundary ) assert glboundary.bc is lboundary lp.local_symmetries.append( (buf, left_boundary_layer, bl_shape, d, 1, glboundary) ) if not should_exchange_to_right: grboundary = ( global_rboundaries_config[d] if (global_rboundaries_config is not None) else rboundary ) assert grboundary.bc is rboundary lp.local_symmetries.append( (buf, right_boundary_layer, bl_shape, d, 0, grboundary) ) if nprocs == 1: # We need to exchange with ourselves (by periodicity) assert at_left and at_right assert should_exchange_to_left == should_exchange_to_right should_exchange = should_exchange_to_left if should_exchange: lp.local_exchanges.append( (buf, outer_right, inner_left, shape, d) ) lp.local_exchanges.append( (buf, outer_left, inner_right, shape, d) ) lp.from_buffer = first_not_None(lp.from_buffer, False) lp.to_buffer = first_not_None(lp.to_buffer, False) elif ghost_op is GhostOperation.EXCHANGE: # SEND DIRECTION IS # INNER GHOSTS ---> OUTER GHOSTS # OPERATION IS # OUTER_GHOSTS[...] = INNER_GHOSTS if exchange_method is ExchangeMethod.NEIGHBOR_ALL_TO_ALL_V: # Send and receive every buffer at once to neighbours # /!\ we send and receive all data components at once if nprocs == 2: # switch left and right in 2 proc periodic case outer_right, outer_left = outer_left, outer_right if should_exchange_to_left: lp.v_send_requests.setdefault( ("left", left_rank), [] ).append(buf[inner_left]) lp.v_recv_requests.setdefault( ("left", left_rank), [] ).append(buf[outer_left]) if should_exchange_to_right: lp.v_send_requests.setdefault( ("right", right_rank), [] ).append(buf[inner_right]) lp.v_recv_requests.setdefault( ("right", right_rank), [] ).append(buf[outer_right]) lp.from_buffer = True lp.to_buffer = True elif exchange_method is ExchangeMethod.NEIGHBOR_ALL_TO_ALL_W: # Send and receive to and from every neigbours # /!\ only one buffer at a time if nprocs == 2: # switch left and right in 2 proc periodic case outer_right, outer_left = outer_left, outer_right if should_exchange_to_left: lp.w_send_requests.setdefault( ("left", left_rank), [] ).append((buf, inner_left)) lp.w_recv_requests.setdefault( ("left", left_rank), [] ).append((buf, outer_left)) if should_exchange_to_right: lp.w_send_requests.setdefault( ("right", right_rank), [] ).append((buf, inner_right)) lp.w_recv_requests.setdefault( ("right", right_rank), [] ).append((buf, outer_right)) lp.from_buffer = src_data_on_device lp.to_buffer = dst_data_on_device elif exchange_method is ExchangeMethod.ISEND_IRECV: # Exchanges with left neighour if should_exchange_to_left: # send inner left to left rank assert (left_rank != local_rank) and ( left_rank != -1 ), left_rank sendtag = msg_tag(i, local_rank, left_rank, d, 0) if src_data_on_device: tmp = self.host_backend.empty( shape=shape, dtype=base_dtype ) send_buf = tmp.handle mpi_type = base_mpi_type.Create_contiguous( send_buf.size ) mpi_type.Commit() lp.i_src_buffers += ((tmp, buf, inner_left),) else: send_buf = buf.handle mpi_type = TopoTools.create_subarray_from_buffer( send_buf, inner_left ) send_kwds = { "buf": [send_buf, mpi_type], "dest": left_rank, "tag": sendtag, } lp.isend_kwds.append(send_kwds) msg = "\t\t>P{}.ISend(shape={}, dtype={}, tag={}) inner left data to left neighbour process P{}." msg = msg.format( local_rank, shape, buf.dtype, sendtag, left_rank ) gprint(msg) # receive outer right from left rank recvtag = msg_tag(i, left_rank, local_rank, d, 1) if dst_data_on_device: tmp = self.host_backend.empty( shape=shape, dtype=base_dtype ) recv_buf = tmp.handle mpi_type = base_mpi_type.Create_contiguous( recv_buf.size ) mpi_type.Commit() lp.i_dst_buffers += ((tmp, buf, outer_left),) else: recv_buf = buf.handle mpi_type = TopoTools.create_subarray_from_buffer( recv_buf, outer_left ) recv_kwds = { "buf": [recv_buf, mpi_type], "source": left_rank, "tag": recvtag, } lp.irecv_kwds.append(recv_kwds) msg = "\t\t>P{}.IRecv(shape={}, dtype={}, tag={}) outer left data from left neighbour process P{}." msg = msg.format( local_rank, shape, buf.dtype, recvtag, left_rank ) gprint(msg) if should_exchange_to_right: # Exchanges with right neighour assert (right_rank != local_rank) and (right_rank != -1) # send inner right to right rank sendtag = msg_tag(i, local_rank, right_rank, d, 1) if src_data_on_device: tmp = self.host_backend.empty( shape=shape, dtype=base_dtype ) send_buf = tmp.handle mpi_type = base_mpi_type.Create_contiguous( send_buf.size ) mpi_type.Commit() lp.i_src_buffers += ((tmp, buf, inner_right),) else: send_buf = buf.handle mpi_type = TopoTools.create_subarray_from_buffer( send_buf, inner_right ) send_kwds = { "buf": [send_buf, mpi_type], "dest": right_rank, "tag": sendtag, } lp.isend_kwds.append(send_kwds) msg = "\t\t>P{}.ISend(shape={}, dtype={}, tag={}) inner right data to right neighbour process P{}." msg = msg.format( local_rank, shape, buf.dtype, sendtag, right_rank ) gprint(msg) # receive outer left from right rank recvtag = msg_tag(i, right_rank, local_rank, d, 0) if dst_data_on_device: tmp = self.host_backend.empty( shape=shape, dtype=base_dtype ) recv_buf = tmp.handle mpi_type = base_mpi_type.Create_contiguous( recv_buf.size ) mpi_type.Commit() lp.i_dst_buffers += ((tmp, buf, outer_right),) else: recv_buf = buf.handle mpi_type = TopoTools.create_subarray_from_buffer( recv_buf, outer_right ) recv_kwds = { "buf": [recv_buf, mpi_type], "source": right_rank, "tag": recvtag, } lp.irecv_kwds.append(recv_kwds) msg = "\t\t>P{}.IRecv(shape={}, dtype={}, tag={}) outer right data from right neighbour process P{}." msg = msg.format( local_rank, shape, buf.dtype, recvtag, right_rank ) gprint(msg) lp.from_buffer = src_data_on_device lp.to_buffer = dst_data_on_device else: msg = f"Unknown MPI exchange method {exchange_method}." raise NotImplementedError(msg) elif ghost_op in (GhostOperation.ACCUMULATE,): # FOR PERIODIC OR NONE BOUNDARIES: # SEND DIRECTION IS # OUTER GHOSTS ---> TMP BUFFER # OPERATION IS # INNER GHOSTS = OP(INNER_GHOSTS, OUTER_GHOSTS) # ELSE OPERATION IS # OUTER_GHOSTS[...] = 0 if exchange_method is ExchangeMethod.NEIGHBOR_ALL_TO_ALL_V: # Send and receive every buffer at once to neighbours # /!\ we send and receive all data components at once left_rank = neighbour_ranks[0, d] right_rank = neighbour_ranks[1, d] if nprocs == 2: # switch left and right in 2 proc periodic case inner_right, inner_left = inner_left, inner_right if should_exchange_to_left: lp.v_send_requests.setdefault( ("left", left_rank), [] ).append(buf[outer_left]) lp.v_recv_requests.setdefault( ("left", left_rank), [] ).append(buf[inner_left]) if should_exchange_to_right: lp.v_send_requests.setdefault( ("right", right_rank), [] ).append(buf[outer_right]) lp.v_recv_requests.setdefault( ("right", right_rank), [] ).append(buf[inner_right]) lp.from_buffer = True lp.to_buffer = True elif exchange_method is ExchangeMethod.NEIGHBOR_ALL_TO_ALL_W: # Send and receive to and from every neigbours # /!\ only one buffer at a time if nprocs == 2: # switch left and right in 2 proc periodic case inner_right, inner_left = inner_left, inner_right left_rank = neighbour_ranks[0, d] right_rank = neighbour_ranks[1, d] if should_exchange_to_left: lp.w_send_requests.setdefault( ("left", left_rank), [] ).append((buf, outer_left)) lp.w_recv_requests.setdefault( ("left", left_rank), [] ).append((buf, inner_left)) if should_exchange_to_right: lp.w_send_requests.setdefault( ("right", right_rank), [] ).append((buf, outer_right)) lp.w_recv_requests.setdefault( ("right", right_rank), [] ).append((buf, inner_right)) lp.from_buffer = src_data_on_device lp.to_buffer = True elif exchange_method is ExchangeMethod.ISEND_IRECV: if should_exchange_to_left: # Exchanges with left neighour left_rank = neighbour_ranks[0, d] assert (left_rank != local_rank) and (left_rank != -1) # send outer left to left rank sendtag = msg_tag(i, local_rank, left_rank, d, 0) if src_data_on_device: tmp = self.host_backend.empty( shape=shape, dtype=base_dtype ) send_buf = tmp.handle mpi_type = base_mpi_type.Create_contiguous( send_buf.size ) mpi_type.Commit() lp.i_src_buffers += ((tmp, buf, outer_left),) else: send_buf = buf.handle mpi_type = TopoTools.create_subarray_from_buffer( send_buf, outer_left ) assert all(send_buf[outer_left].shape == shape) send_kwds = { "buf": [send_buf, mpi_type], "dest": left_rank, "tag": sendtag, } lp.isend_kwds.append(send_kwds) msg = "\t\t>P{}.ISend(shape={}, dtype={}, tag={}) outer left data to left neighbour process P{}." msg = msg.format( local_rank, shape, send_buf.dtype, sendtag, left_rank ) gprint(msg) # receive outer right ghosts data from left rank in a tmp buffer recvtag = msg_tag(i, left_rank, local_rank, d, 1) mpi_type = base_mpi_type.Create_contiguous( buf[inner_left].size ) mpi_type.Commit() tmp = self.host_backend.empty_like( send_buf, shape=shape, dtype=base_dtype ) recv_kwds = { "buf": [tmp.handle, mpi_type], "source": left_rank, "tag": recvtag, } lp.irecv_kwds.append(recv_kwds) lp.i_dst_buffers.append((tmp, buf, inner_left)) msg = "\t\t>P{}.IRecv(shape={}, dtype={}, tag={}) inner left data from left neighbour process P{}." msg = msg.format( local_rank, shape, tmp.dtype, recvtag, left_rank ) gprint(msg) if should_exchange_to_right: # Exchanges with right neighour right_rank = neighbour_ranks[1, d] assert (right_rank != local_rank) and (right_rank != -1) # send outer right to right rank sendtag = msg_tag(i, local_rank, right_rank, d, 1) if src_data_on_device: tmp = self.host_backend.empty( shape=shape, dtype=base_dtype ) send_buf = tmp.handle mpi_type = base_mpi_type.Create_contiguous( send_buf.size ) mpi_type.Commit() lp.i_src_buffers += ((tmp, buf, outer_right),) else: send_buf = buf.handle mpi_type = TopoTools.create_subarray_from_buffer( send_buf, outer_right ) assert all(send_buf[outer_right].shape == shape) send_kwds = { "buf": [send_buf, mpi_type], "dest": right_rank, "tag": sendtag, } lp.isend_kwds.append(send_kwds) msg = "\t\t>P{}.ISend(shape={}, dtype={}, tag={}) outer right data to right neighbour process P{}." msg = msg.format( local_rank, shape, send_buf.dtype, sendtag, right_rank ) gprint(msg) # receive outer left ghosts data from right rank in a tmp buffer recvtag = msg_tag(i, right_rank, local_rank, d, 0) mpi_type = base_mpi_type.Create_contiguous( buf[inner_right].size ) mpi_type.Commit() tmp = self.host_backend.empty(shape=shape, dtype=base_dtype) recv_kwds = { "buf": [tmp.handle, mpi_type], "source": right_rank, "tag": recvtag, } lp.irecv_kwds.append(recv_kwds) lp.i_dst_buffers.append((tmp, buf, inner_right)) msg = "\t\t>P{}.IRecv(shape={}, dtype={}, tag={}) inner right data from right neighbour process P{}." msg = msg.format( local_rank, shape, tmp.dtype, recvtag, right_rank ) gprint(msg) lp.from_buffer = src_data_on_device lp.to_buffer = True else: msg = f"Unknown MPI exchange method {exchange_method}." raise NotImplementedError(msg) else: msg = f"Unknown GhostOperation {ghost_op}." raise NotImplementedError(msg) msg = "Something went wrong while initializing LauncherParameters::{}::{}, got value {{}}." msg = msg.format(ghost_op, exchange_method) assert lp.from_buffer is not None, msg.format(lp.from_buffer) assert lp.to_buffer is not None, msg.format(lp.to_buffer) # handle all_to_all_w kwds (one call for all neighbours per buffer) if exchange_method is ExchangeMethod.NEIGHBOR_ALL_TO_ALL_W: sendtypes, recvtypes = (), () # subarray types sendcounts, recvcounts = (), () # in type counts sdispls, rdispls = (), () # in bytes sdispl, rdispl = 0, 0 scount, rcount = 0, 0 src_buffers, dst_buffers = (), () for direction in range(comm.Get_dim()): for tag, rank in zip(("left", "right"), comm.Shift(direction, 1)): displ = 0 if (tag, rank) in lp.w_send_requests: requests = lp.w_send_requests[(tag, rank)] assert len(requests) == 1 (src, slc) = requests[0] assert src is buf assert src.dtype == base_dtype if lp.from_buffer: mpi_type = base_mpi_type send_count = src[slc].size displ = src[slc].nbytes vslc = slice(scount, scount + send_count) src_buffers += ((src[slc], vslc),) else: mpi_type = TopoTools.create_subarray_from_buffer( src, slc ) send_count = 1 else: mpi_type = base_mpi_type send_count = 0 sendtypes += (mpi_type,) sendcounts += (send_count,) sdispls += (sdispl,) sdispl += displ scount += send_count displ = 0 if (tag, rank) in lp.w_recv_requests: requests = lp.w_recv_requests[(tag, rank)] assert len(requests) == 1 (dst, slc) = requests[0] assert dst is buf assert dst.dtype == base_dtype if lp.to_buffer: mpi_type = base_mpi_type recv_count = dst[slc].size displ = dst[slc].nbytes vslc = slice(rcount, rcount + recv_count) dst_buffers += ((dst[slc], vslc),) else: mpi_type = TopoTools.create_subarray_from_buffer( dst, slc ) recv_count = 1 else: mpi_type = base_mpi_type recv_count = 0 recvtypes += (mpi_type,) recvcounts += (recv_count,) rdispls += (rdispl,) rdispl += displ rcount += recv_count if lp.from_buffer: assert scount > 0, scount send_size = scount send_buffer = self.host_backend.empty( shape=(send_size,), dtype=base_dtype ) sendbuf = send_buffer.handle else: send_buffer = None sendbuf = buf.handle if lp.to_buffer: assert rcount > 0, rcount recv_size = rcount recv_buffer = self.host_backend.empty( shape=(recv_size,), dtype=base_dtype ) recvbuf = recv_buffer.handle else: recv_buffer = None recvbuf = buf.handle lp.w_src_buffers.append(src_buffers) lp.w_send_buffer.append(send_buffer) lp.w_dst_buffers.append(dst_buffers) lp.w_recv_buffer.append(recv_buffer) lp.w_kwds.append( { "sendbuf": [sendbuf, sendcounts, sdispls, sendtypes], "recvbuf": [recvbuf, recvcounts, rdispls, recvtypes], } ) # handle all_to_all_v kwds (one call for all buffers and all neighbours) if lp.has_mpi_exchanges and ( exchange_method is ExchangeMethod.NEIGHBOR_ALL_TO_ALL_V ): assert lp.v_send_requests assert lp.v_recv_requests sendtype, recvtype = base_mpi_type, base_mpi_type sendcounts, recvcounts = (), () # in type counts sdispls, rdispls = (), () # in type counts send_disp, recv_disp = 0, 0 # in type counts src_buffers, dst_buffers = (), () send_buffer, recv_buffer = None, None for direction in range(comm.Get_dim()): for tag, rank in zip(("left", "right"), comm.Shift(direction, 1)): send_count, recv_count = 0, 0 if (tag, rank) in lp.v_send_requests: for src in lp.v_send_requests[(tag, rank)]: assert src.dtype == base_dtype slc = slice( send_disp + send_count, send_disp + send_count + src.size, ) src_buffers += ((src, (slc,)),) send_count += src.size sendcounts += (send_count,) sdispls += (send_disp,) send_disp += send_count if (tag, rank) in lp.v_recv_requests: for dst in lp.v_recv_requests[(tag, rank)]: assert dst.dtype == base_dtype slc = slice( recv_disp + recv_count, recv_disp + recv_count + dst.size, ) dst_buffers += ((dst, (slc,)),) recv_count += dst.size recvcounts += (recv_count,) rdispls += (recv_disp,) recv_disp += recv_count assert send_disp == sum(sendcounts) > 0, send_disp send_size = send_disp send_buffer = self.host_backend.empty(shape=(send_size,), dtype=base_dtype) assert recv_disp == sum(recvcounts) > 0, recv_disp recv_size = recv_disp recv_buffer = self.host_backend.empty(shape=(recv_size,), dtype=base_dtype) lp.v_kwds = { "sendbuf": [send_buffer.handle, sendcounts, sdispls, sendtype], "recvbuf": [recv_buffer.handle, recvcounts, rdispls, recvtype], } lp.v_src_buffers = src_buffers lp.v_dst_buffers = dst_buffers lp.v_send_buffer = send_buffer lp.v_recv_buffer = recv_buffer return lp.setup() def _build_python_launcher(self): assert self._launcher is None ghost_op = self.ghost_op exchange_method = self.exchange_method comm = self.topology.comm base_dtype = self.base_dtype lp = self._prepare_launcher() if ghost_op is GhostOperation.EXCHANGE: # Warning: in EXCHANGE operation: # cannot overlap comm with local exchanges (because of edges and corners) # mpi buffers are the only buffer where all exchanges take place def local_exchanges(lp=lp): for fn, buf, slices, _, _, _, _ in lp.local_symmetries: gprint(" local_symmetry") fn(buf[slices]) for buf, outer, inner, shape, direction in lp.local_exchanges: gprint(" local_exchange") buf[outer] = buf[inner] for buf, slc, shape, val in lp.diagonal_ghosts: gprint(" diagonal ghosts") buf[slc] = val if lp.has_mpi_exchanges: if exchange_method is ExchangeMethod.NEIGHBOR_ALL_TO_ALL_W: assert len(lp.w_kwds) > 0 assert lp.from_buffer is False assert lp.to_buffer is False def python_launcher(lp=lp): evts = [] for kwds in lp.w_kwds: evt = comm.Ineighbor_alltoallw(**kwds) evts.append(evt) MPI.Request.Waitall(evts) local_exchanges() # perform local exchanges after comm end elif exchange_method is ExchangeMethod.NEIGHBOR_ALL_TO_ALL_V: assert lp.v_kwds assert lp.from_buffer is True, lp.from_buffer assert lp.to_buffer is True, lp.to_buffer def python_launcher(lp=lp): for src, slc in lp.v_src_buffers: lp.v_send_buffer[slc] = src.ravel() evt = comm.Ineighbor_alltoallv(**lp.v_kwds) evt.wait() for dst, slc in lp.v_dst_buffers: dst[...] = lp.v_recv_buffer[slc].reshape(dst.shape) local_exchanges() # perform local exchanges after comm end elif exchange_method is ExchangeMethod.ISEND_IRECV: assert lp.irecv_kwds assert lp.isend_kwds assert lp.from_buffer is False assert lp.to_buffer is False def python_launcher(lp=lp): evts = [] for recv_kwds in lp.irecv_kwds: gprint( " Irecv(source={}, tag={})".format( recv_kwds["source"], recv_kwds["tag"] ) ) evt = comm.Irecv(**recv_kwds) evts.append(evt) for send_kwds in lp.isend_kwds: gprint( " Isend(dest={}, tag={})".format( send_kwds["dest"], send_kwds["tag"] ) ) evt = comm.Isend(**send_kwds) evts.append(evt) MPI.Request.Waitall(evts) local_exchanges() # perform local exchanges after comm end else: msg = f"Unknown MPI exchange method {exchange_method}." raise NotImplementedError(msg) else: python_launcher = local_exchanges elif ghost_op is GhostOperation.ACCUMULATE: # Warning: in ACCUMULATE operations: # can overlap mpi comm with local exchanges because mpi uses distinct buffers def local_exchanges(lp=lp): for buf, outer, inner, shape, direction in lp.local_exchanges: buf[inner] += buf[outer] if lp.has_mpi_exchanges: if exchange_method is ExchangeMethod.NEIGHBOR_ALL_TO_ALL_W: assert len(lp.w_kwds) > 0 assert lp.from_buffer is False assert lp.to_buffer is True def python_launcher(lp=lp): evts = [] for kwds, send_buffer, src_buffers in zip( lp.w_kwds, lp.w_send_buffer, lp.w_src_buffers ): for src, slc in src_buffers: send_buffer[slc] = src.ravel() evt = comm.Ineighbor_alltoallw(**kwds) evts.append(evt) local_exchanges() for idx in iter_mpi_requests(evts): for dst, slc in lp.w_dst_buffers[idx]: dst[...] += lp.w_recv_buffer[idx][slc].reshape( dst.shape ) elif exchange_method is ExchangeMethod.NEIGHBOR_ALL_TO_ALL_V: assert lp.v_kwds assert lp.from_buffer is True assert lp.to_buffer is True def python_launcher(lp=lp): for src, slc in lp.v_src_buffers: lp.v_send_buffer[slc] = src.ravel() evt = comm.Ineighbor_alltoallv(**lp.v_kwds) local_exchanges() evt.wait() for dst, slc in lp.v_dst_buffers: dst[...] += lp.v_recv_buffer[slc].reshape(dst.shape) elif exchange_method is ExchangeMethod.ISEND_IRECV: assert lp.from_buffer is False assert lp.to_buffer is True assert len(lp.irecv_kwds) == len(lp.isend_kwds) > 0 def python_launcher(lp=lp): send_evts = [] recv_evts = [] for recv_kwds in lp.irecv_kwds: gprint( " Irecv(source={}, tag={})".format( recv_kwds["source"], recv_kwds["tag"] ) ) gprint_buffer( " >receiving", recv_kwds["buf"], no_data=True ) evt = comm.Irecv(**recv_kwds) recv_evts.append(evt) for send_kwds in lp.isend_kwds: gprint( " Isend(dest={}, tag={})".format( send_kwds["dest"], send_kwds["tag"] ) ) gprint_buffer(" >sending", send_kwds["buf"]) evt = comm.Isend(**send_kwds) send_evts.append(evt) local_exchanges() for idx in iter_mpi_requests(recv_evts): (tmp, buf, inner) = lp.i_dst_buffers[idx] gprint_buffer(" Received", tmp) gprint_buffer(" Summing to", buf[inner]) buf[inner] += tmp gprint_buffer(" Result is", buf[inner]) MPI.Request.Waitall(send_evts) else: msg = f"Unknown MPI exchange method {exchange_method}." raise NotImplementedError(msg) else: python_launcher = local_exchanges else: msg = f"Unknown GhostOperation {ghost_op}." raise NotImplementedError(msg) self._launcher = python_launcher def _build_opencl_launcher(self): assert self._launcher is None from hysop.backend.device.opencl.opencl_kernel_launcher import ( OpenClKernelListLauncher, HostLauncherI, ) from hysop.backend.device.opencl.opencl_copy_kernel_launchers import ( OpenClCopyBufferRectLauncher, ) class FunctionLauncher(HostLauncherI): def __init__(self, name, fn): super().__init__(name=name) self._fn = fn def __call__(self, *args, **kwds): super().__call__() return self._fn(*args, **kwds) class MPIGhostExchangeLauncher(FunctionLauncher): def __init__(self, fn): super().__init__(name="MPI_Ghost_Exchange_Launcher", fn=fn) lp = self._prepare_launcher() dim = self.dim ghost_op = self.ghost_op exchange_method = self.exchange_method comm = self.topology.comm base_dtype = self.base_dtype # generate the minimal number of temporary backend buffers tmp_buffers = {} def mk_tmp(shape, dtype, color=0, host=False): nbytes =, dtype=np.int64) * dtype.itemsize key = (host, nbytes, color) if key in tmp_buffers: tmp = tmp_buffers[key].reshape(shape).view(dtype=dtype) elif host: tmp = self.host_backend.empty(shape=shape, dtype=dtype) tmp_buffers[key] = tmp else: tmp = self.backend.empty(shape=shape, dtype=dtype) tmp_buffers[key] = tmp return tmp if ghost_op is GhostOperation.EXCHANGE: name = f"local_ghosts_exchanges_{}" local_kl = OpenClKernelListLauncher(name=name) for fn, buf, slices, shape, direction, to_left, bc in lp.local_symmetries: # SYMMETRIC OR ANTISYMMETRIC LOCAL EXCHANGE dirlabel = DirectionLabels[dim - direction - 1] vname = "{}_{}_{}_{}".format(, dirlabel, "left" if to_left else "right", str(bc.bc).lower(), ) tmp = mk_tmp(shape=shape, dtype=base_dtype, host=True) k0 = OpenClCopyBufferRectLauncher.from_slices( varname=vname + "_boundary_layer", src=buf, src_slices=slices, dst=tmp, ) k1 = OpenClCopyBufferRectLauncher.from_slices( varname=vname + "_boundary_layer", src=tmp, dst=buf, dst_slices=slices, ) def apply_fn(f=fn, X=tmp, k0=k0, k1=k1, **kwds): evt = k0(**kwds) evt.wait() f(X) evt = k1(**kwds) return evt local_kl += FunctionLauncher(vname + "_apply_on_host", apply_fn) for i, (buf, outer_slc, inner_slc, shape, direction) in enumerate( lp.local_exchanges ): # PERIODIC LOCAL EXCHANGE dirlabel = DirectionLabels[dim - direction - 1] vname = f"{}_{i}_{dirlabel}" # some opencl platforms reject inplace buffer copies # so we use a tmp buffer to perform local ghost exchanges tmp = mk_tmp(shape=shape, dtype=base_dtype) # exchange all left inner ghosts to right outer ghosts local_kl += OpenClCopyBufferRectLauncher.from_slices( varname=vname + "_inner_to_tmp_ghost_buffer", src=buf, src_slices=inner_slc, dst=tmp, ) local_kl += OpenClCopyBufferRectLauncher.from_slices( varname=vname + "_tmp_ghost_buffer_to_outer", src=tmp, dst=buf, dst_slices=outer_slc, ) for i, (buf, slc, shape, value) in enumerate(lp.diagonal_ghosts): vname = f"{}_diagonal_{i}" tmp = self.backend.empty(shape=shape, dtype=base_dtype) tmp.fill(value) local_kl += OpenClCopyBufferRectLauncher.from_slices( varname=vname, src=tmp, dst=buf, dst_slices=slc ) kl = OpenClKernelListLauncher(name=f"exchange_ghosts_{}") if lp.has_mpi_exchanges: assert lp.from_buffer is True assert lp.to_buffer is True if exchange_method is ExchangeMethod.NEIGHBOR_ALL_TO_ALL_W: assert len(lp.w_kwds) > 0 d2h_kls = [] h2d_kls = [] for i, (send_buffer, src_buffers) in enumerate( zip(lp.w_send_buffer, lp.w_src_buffers) ): d2h_kl_i = OpenClKernelListLauncher( f"inner_ghosts_D2H_exchanges_{}_{i}" ) for j, (src, dst_slc) in enumerate(src_buffers): vname = f"{}_{i}_{j}" dst = send_buffer[dst_slc].reshape(src.shape) d2h_kl_i += OpenClCopyBufferRectLauncher.from_slices( varname=vname + "_inner_ghosts_D2H", src=src, src_slices=None, dst=dst, dst_slices=None, ) d2h_kls.append(d2h_kl_i) for i, (recv_buffer, dst_buffers) in enumerate( zip(lp.w_recv_buffer, lp.w_dst_buffers) ): h2d_kl_i = OpenClKernelListLauncher( f"outer_ghosts_H2D_exchanges_{}_{i}" ) for j, (dst, src_slc) in enumerate(dst_buffers): src = recv_buffer[src_slc].reshape(dst.shape) h2d_kl_i += OpenClCopyBufferRectLauncher.from_slices( varname=vname + "_outer_ghosts_H2D", src=src, src_slices=None, dst=dst, dst_slices=None, ) h2d_kls.append(h2d_kl_i) def opencl_launcher(**kwds): d2h_evts, mpi_evts = [], [] for kl in d2h_kls: evt = kl(**kwds) d2h_evts.append(evt) local_kl(**kwds) for evt, w_kwds in zip(d2h_evts, lp.w_kwds): evt.wait() mpi_evt = comm.Ineighbor_alltoallw(**w_kwds) mpi_evts.append(mpi_evt) for idx in iter_mpi_requests(mpi_evts): evt = h2d_kls[idx](**kwds) return evt kl += MPIGhostExchangeLauncher(opencl_launcher) elif exchange_method is ExchangeMethod.NEIGHBOR_ALL_TO_ALL_V: assert lp.v_kwds d2h_kl = OpenClKernelListLauncher( f"inner_ghosts_D2H_exchanges_{}" ) h2d_kl = OpenClKernelListLauncher( f"outer_ghosts_H2D_exchanges_{}" ) for i, (src, dst_slc) in enumerate(lp.v_src_buffers): vname = f"{}_{i}" dst = lp.v_send_buffer[dst_slc].reshape(src.shape) d2h_kl += OpenClCopyBufferRectLauncher.from_slices( varname=vname + "_inner_ghosts_D2H", src=src, src_slices=None, dst=dst, dst_slices=None, ) for i, (dst, src_slc) in enumerate(lp.v_dst_buffers): vname = f"{}_{i}" src = lp.v_recv_buffer[src_slc].reshape(dst.shape) h2d_kl += OpenClCopyBufferRectLauncher.from_slices( varname=vname + "_outer_ghosts_H2D", src=src, src_slices=None, dst=dst, dst_slices=None, ) def opencl_launcher(**kwds): evt = d2h_kl(**kwds) evt.wait() mpi_evt = comm.Ineighbor_alltoallv(**lp.v_kwds) local_kl(**kwds) mpi_evt.wait() evt = h2d_kl(**kwds) return evt kl += MPIGhostExchangeLauncher(opencl_launcher) elif exchange_method is ExchangeMethod.ISEND_IRECV: assert len(lp.i_src_buffers) == len(lp.i_dst_buffers) > 0 assert len(lp.isend_kwds) == len(lp.irecv_kwds) > 0 assert len(lp.isend_kwds) == len(lp.i_src_buffers) d2h_kls = [] h2d_kls = [] for i, (dst, src, src_slc) in enumerate(lp.i_src_buffers): vname = f"{}_{i}" d2h_kl = OpenClCopyBufferRectLauncher.from_slices( varname=vname + "_inner_ghosts_D2H", src=src, src_slices=src_slc, dst=dst, dst_slices=None, ) d2h_kls.append(d2h_kl) for i, (src, dst, dst_slc) in enumerate(lp.i_dst_buffers): vname = f"{}_{i}" h2d_kl = OpenClCopyBufferRectLauncher.from_slices( varname=vname + "_outer_ghosts_H2D", src=src, src_slices=None, dst=dst, dst_slices=dst_slc, ) h2d_kls.append(h2d_kl) def opencl_launcher(**kwds): d2h_evts, mpi_send_evts, mpi_recv_evts = [], [], [] for kl in d2h_kls: evt = kl(**kwds) d2h_evts.append(evt) local_kl(**kwds) for recv_kwds in lp.irecv_kwds: mpi_evt = comm.Irecv(**recv_kwds) mpi_recv_evts.append(mpi_evt) for evt, send_kwds in zip(d2h_evts, lp.isend_kwds): evt.wait() mpi_evt = comm.Isend(**send_kwds) mpi_send_evts.append(mpi_evt) for idx in iter_mpi_requests(mpi_recv_evts): evt = h2d_kls[idx](**kwds) MPI.Request.Waitall(mpi_send_evts) return evt kl += MPIGhostExchangeLauncher(opencl_launcher) else: msg = f"Unknown MPI exchange method {exchange_method}." raise NotImplementedError(msg) else: kl += local_kl elif ghost_op is GhostOperation.ACCUMULATE: name = f"local_ghosts_accumulate_{}" local_kl = OpenClKernelListLauncher(name=name) for i, (buf, outer_slc, inner_slc, shape, direction) in enumerate( lp.local_exchanges ): dirlabel = DirectionLabels[dim - direction - 1] vname = f"{}_{i}_{dirlabel}" ltmp = mk_tmp(shape=shape, dtype=base_dtype, color=0) rtmp = mk_tmp(shape=shape, dtype=base_dtype, color=1) # accumulate outer ghosts to inner ghosts local_kl += OpenClCopyBufferRectLauncher.from_slices( varname=vname + "_inner_to_tmp_ghost_buffer_0", src=buf, src_slices=inner_slc, dst=ltmp, ) local_kl += OpenClCopyBufferRectLauncher.from_slices( varname=vname + "_outer_to_tmp_ghost_buffer_1", src=buf, src_slices=outer_slc, dst=rtmp, ) local_kl += self.backend.add( x1=ltmp, x2=rtmp, out=ltmp, build_kernel_launcher=True ) local_kl += OpenClCopyBufferRectLauncher.from_slices( varname=vname + "_accumulation_to_inner", src=ltmp, dst=buf, dst_slices=inner_slc, ) kl = OpenClKernelListLauncher(name=f"accumulate_ghosts_{}") if lp.has_mpi_exchanges: if exchange_method is ExchangeMethod.NEIGHBOR_ALL_TO_ALL_W: assert len(lp.w_kwds) > 0 d2h_kls = [] h2d_kls = [] for i, (send_buffer, src_buffers) in enumerate( zip(lp.w_send_buffer, lp.w_src_buffers) ): d2h_kl_i = OpenClKernelListLauncher( f"outer_ghosts_D2H_exchanges_{}_{i}" ) for j, (src, dst_slc) in enumerate(src_buffers): vname = f"{}_{i}_{j}" dst = send_buffer[dst_slc].reshape(src.shape) d2h_kl_i += OpenClCopyBufferRectLauncher.from_slices( varname=vname + "_inner_ghosts_D2H", src=src, src_slices=None, dst=dst, dst_slices=None, ) d2h_kls.append(d2h_kl_i) for i, (recv_buffer, dst_buffers) in enumerate( zip(lp.w_recv_buffer, lp.w_dst_buffers) ): h2d_kl_i = OpenClKernelListLauncher( f"inner_ghosts_H2D_exchanges_{}_{i}" ) for j, (dst, src_slc) in enumerate(dst_buffers): vname = f"{}_{i}_{j}" shape = dst.shape outer = recv_buffer[src_slc].reshape(shape) inner = dst ltmp = mk_tmp(shape=shape, dtype=base_dtype, color=0) rtmp = mk_tmp(shape=shape, dtype=base_dtype, color=1) h2d_kl_i += OpenClCopyBufferRectLauncher.from_slices( varname=vname + "_outer_to_tmp_ghost_buffer_0", src=outer, src_slices=None, dst=ltmp, dst_slices=None, ) h2d_kl_i += OpenClCopyBufferRectLauncher.from_slices( varname=vname + "_inner_to_tmp_ghost_buffer_1", src=inner, src_slices=None, dst=rtmp, dst_slices=None, ) h2d_kl_i += self.backend.add( x1=ltmp, x2=rtmp, out=ltmp, build_kernel_launcher=True ) h2d_kl_i += OpenClCopyBufferRectLauncher.from_slices( varname=vname + "_accumulation_to_inner", src=ltmp, dst=inner, ) h2d_kls.append(h2d_kl_i) def opencl_launcher(**kwds): d2h_evts, mpi_evts = [], [] for kl in d2h_kls: evt = kl(**kwds) d2h_evts.append(evt) local_kl(**kwds) for evt, w_kwds in zip(d2h_evts, lp.w_kwds): evt.wait() mpi_evt = comm.Ineighbor_alltoallw(**w_kwds) mpi_evts.append(mpi_evt) for idx in iter_mpi_requests(mpi_evts): evt = h2d_kls[idx](**kwds) return evt kl += MPIGhostExchangeLauncher(opencl_launcher) elif exchange_method is ExchangeMethod.NEIGHBOR_ALL_TO_ALL_V: assert lp.v_kwds assert lp.from_buffer is True assert lp.to_buffer is True d2h_kl = OpenClKernelListLauncher( f"inner_ghosts_D2H_exchanges_{}" ) h2d_kl = OpenClKernelListLauncher( f"outer_ghosts_H2D_exchanges_{}" ) for i, (src, dst_slc) in enumerate(lp.v_src_buffers): vname = f"{}_{i}" dst = lp.v_send_buffer[dst_slc].reshape(src.shape) d2h_kl += OpenClCopyBufferRectLauncher.from_slices( varname=vname + "_inner_ghosts_D2H", src=src, src_slices=None, dst=dst, dst_slices=None, ) for i, (dst, src_slc) in enumerate(lp.v_dst_buffers): vname = f"{}_{i}" shape = dst.shape outer = lp.v_recv_buffer[src_slc].reshape(shape) inner = dst ltmp = mk_tmp(shape=shape, dtype=base_dtype, color=0) rtmp = mk_tmp(shape=shape, dtype=base_dtype, color=1) h2d_kl += OpenClCopyBufferRectLauncher.from_slices( varname=vname + "_outer_to_tmp_ghost_buffer_0", src=outer, src_slices=None, dst=ltmp, dst_slices=None, ) h2d_kl += OpenClCopyBufferRectLauncher.from_slices( varname=vname + "_inner_to_tmp_ghost_buffer_1", src=inner, src_slices=None, dst=rtmp, dst_slices=None, ) h2d_kl += self.backend.add( x1=ltmp, x2=rtmp, out=ltmp, build_kernel_launcher=True ) h2d_kl += OpenClCopyBufferRectLauncher.from_slices( varname=vname + "_accumulation_to_inner", src=ltmp, dst=inner, ) def opencl_launcher(**kwds): evt = d2h_kl(**kwds) evt.wait() mpi_evt = comm.Ineighbor_alltoallv(**lp.v_kwds) local_kl(**kwds) mpi_evt.wait() evt = h2d_kl(**kwds) return evt kl += MPIGhostExchangeLauncher(opencl_launcher) elif exchange_method is ExchangeMethod.ISEND_IRECV: assert len(lp.i_src_buffers) == len(lp.i_dst_buffers) > 0 assert len(lp.isend_kwds) == len(lp.irecv_kwds) > 0 assert len(lp.isend_kwds) == len(lp.i_src_buffers) d2h_kls = [] h2d_kls = [] for i, (dst, src, src_slc) in enumerate(lp.i_src_buffers): vname = f"{}_{i}" d2h_kl = OpenClCopyBufferRectLauncher.from_slices( varname=vname + "_outer_ghosts_D2H", src=src, src_slices=src_slc, dst=dst, dst_slices=None, ) d2h_kls.append(d2h_kl) for i, (src, dst, dst_slc) in enumerate(lp.i_dst_buffers): shape = src.shape outer = src inner = dst[dst_slc] ltmp = mk_tmp(shape=shape, dtype=base_dtype, color=0) rtmp = mk_tmp(shape=shape, dtype=base_dtype, color=1) vname = f"{}_{i}" h2d_kl = OpenClKernelListLauncher( f"outer_ghosts_H2D_exchanges_{}_{i}" ) h2d_kl += OpenClCopyBufferRectLauncher.from_slices( varname=vname + "_outer_to_tmp_ghost_buffer_0", src=outer, src_slices=None, dst=ltmp, dst_slices=None, ) h2d_kl += OpenClCopyBufferRectLauncher.from_slices( varname=vname + "_inner_to_tmp_ghost_buffer_1", src=inner, src_slices=None, dst=rtmp, dst_slices=None, ) h2d_kl += self.backend.add( x1=ltmp, x2=rtmp, out=ltmp, build_kernel_launcher=True ) h2d_kl += OpenClCopyBufferRectLauncher.from_slices( varname=vname + "_accumulation_to_inner", src=ltmp, dst=inner, ) h2d_kls.append(h2d_kl) def opencl_launcher(**kwds): d2h_evts, mpi_send_evts, mpi_recv_evts = [], [], [] for kl in d2h_kls: evt = kl(**kwds) d2h_evts.append(evt) local_kl(**kwds) for recv_kwds in lp.irecv_kwds: mpi_evt = comm.Irecv(**recv_kwds) mpi_recv_evts.append(mpi_evt) for evt, send_kwds in zip(d2h_evts, lp.isend_kwds): evt.wait() mpi_evt = comm.Isend(**send_kwds) mpi_send_evts.append(mpi_evt) for idx in iter_mpi_requests(mpi_recv_evts): evt = h2d_kls[idx](**kwds) MPI.Request.Waitall(mpi_send_evts) return evt kl += MPIGhostExchangeLauncher(opencl_launcher) else: msg = f"Unknown MPI exchange method {exchange_method}." raise NotImplementedError(msg) else: kl += local_kl self._launcher = kl